[iceberg] support migrate iceberg table suffering schema evolution#5078
[iceberg] support migrate iceberg table suffering schema evolution#5078LsomeYeah wants to merge 13 commits intoapache:masterfrom
Conversation
…g to paimon # Conflicts: # paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
…g table in hive is external
…mon table to source table if migrating success [core] alter access permisssion of getDataTypeFromType()
# Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory # Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
…IcebergTableProcedureITCase
…iding dependency conflicts
| private final InternalMap lowerBounds; | ||
| private final InternalMap upperBounds; | ||
|
|
||
| // only used for iceberg migrate |
There was a problem hiding this comment.
I think it is better : "only used for migrate iceberg table to paimon with schema evolution".
There was a problem hiding this comment.
Before migration, we do not check whether the Iceberg table has undergone schema evolution. schemaId will be used in all cases for iceberg migration.
| // get iceberg current schema | ||
| IcebergSchema icebergSchema = | ||
| icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); | ||
| public List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { |
| } | ||
| } | ||
|
|
||
| public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { |
| String targetDatabase, | ||
| String targetTableName, | ||
| Integer parallelism, | ||
| Map<String, String> options, |
| ParameterUtils.parseCommaSeparatedKeyValues(properties)) | ||
| .executeMigrate(); | ||
| ParameterUtils.parseCommaSeparatedKeyValues(properties)); | ||
| LOG.info("create migrator success."); |
There was a problem hiding this comment.
I think it is better : "create migrator success and begin executeMigrate."
| Map<String, String> catalogConfig, | ||
| String icebergProperties, | ||
| String tableProperties, | ||
| Integer parallelism) { |
| @@ -0,0 +1,90 @@ | |||
| /* | |||
There was a problem hiding this comment.
Should we also modify the doc of these action and procedure in this PR?
There was a problem hiding this comment.
Sorry, the content about hive-catalog and procedure is contained in #4878. This pr should not contained commits that had been merged. I'll close this pr and create a new pr. And the doc of iceberg migration will be submitted later in another separate pr.
| public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { | ||
| @Override | ||
| public String identifier() { | ||
| return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate"; |
There was a problem hiding this comment.
no need call toString()
|
|
||
| /** Factory to create {@link IcebergMigrateHiveMetadata}. */ | ||
| public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { | ||
| @Override |
| public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { | ||
| private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class); | ||
|
|
||
| public static final String TABLE_TYPE_PROP = "table_type"; |
Purpose
Linked issue: close #xxx
In pr #4639 and #4878, we had supported migrating iceberg table managed by hadoop-catalog or hive-catalog to paimon. This pr aims to support migrating the iceberg table which had suffered once or several times schema evolution.
Paimon stores the schema-id in each DataFileMeta for reading data files which had suffered schema evolution, so we extract the schema-id used by each iceberg data file and record it in the corresponding paimon DataFileMeta, and this makes paimon can handle the schema evolution case.
Tests
IcebergMigrateTest#testDeleteColumn
IcebergMigrateTest#testRenameColumn
IcebergMigrateTest#testAddColumn
IcebergMigrateTest#testReorderColumn
IcebergMigrateTest#testUpdateColumn
IcebergMigrateTest#testMigrateWithRandomIcebergEvolution
API and Format
Documentation